多线程七 AQS

一 . 简介AQS

AQS简介

  • 在同步组件的实现中,AQS是核心部分,同步组件的实现者,通过使用AQS提供的模板方法 实现同步组件语义
  • AQS实现了对同步状态的管理以及阻塞线程进行排队,等待通知等等一系列底层的实现处理
  • AQS核心:使用Node实现同步队列,底层是个双向链表,可以用于同步锁或者其他同步装的基础框架

AbstractQueuedSynchronized,虽然类名开头是Abstract,但是他不是抽象类,意义就是说,单独使用它是没有意义的,依赖他去实现同步组件才有意义–相当于没模板方法模式

  • 子类通过继承并实现他的方法,管理其状态 acquire和release
  • 可以同时实现排它锁和共享锁,站在使用者的角度看,它可以帮我们完成两件事,独占控制和共享控制,它的所有子类中要么实现重写了它独占功能的API,要么使用的是共享功能的API,而不会同时使用两套API,即使是他最有名的实现类ReentrantLock,也是通过两个内部类,分别使用者两套API

AQS实现的大致思路,它内部有一个双向的链表,链表的每一个节点都是一个Node的结构,线程会来尝试的获取锁,如果失败了那么它就将当前线程包装成一个Node节点,加入到同步队列中,前一个节点释放锁后,唤醒自己的后继节点,它实现的依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器
此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础

以下类都是依赖AQS是实现的:

  • ReentrantLock
  • ReentrantReadWritrLock.ReadLock
  • ReentrantReadWriteLock.WriteLock

二 . 使用AQS实现自己的锁

自己的锁肯定要去实现lock接口,重写里面的方法,怎么重写呢?使用AQS,将AQS作为内部的帮助器类,重写里面的tryAcquir和tryRelease方法,因为lock()我们采用帮助器acquire的方法实现,而此方法会至少调用一次tryAcquire,同理,释放锁,我们重写帮助器的tryRelease,,,,我们只是简单的使用一下,完成加锁,释放锁,锁重入即可
他面临着两个问题

  1. 怎么知道来拿锁的线程是上一个拿到锁的线程

    • 判断if(当前线程==持有锁的线程){state++;}要求计数器自增
  2. 怎么释放掉锁

    • 重复n次拿到了锁,要求计数器依次减下去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

public class AQSDemo01 implements Lock {
private Helper helper = new Helper();

private class Helper extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int arg) {
int state = getState();
Thread t = Thread.currentThread();

if (state == 0) {
//如果当前状态的值等于预期的值,就把 当前状态的中修改成 arg的值...... ( 刚才掉坑了, compareAndSerState方法,会帮助我们去对比 手动输进去的0 和 当前的状态)
if (compareAndSetState(0, arg)) {
System.out.println("线程来了"+Thread.currentThread().getName()+" arg=="+arg);
setExclusiveOwnerThread(t);
return true;
}
} else if (getExclusiveOwnerThread() == t) {
setState(state + 1);
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {

if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new RuntimeException();
}
int state = getState() - arg;

boolean flag = false;

if (state == 0) {
setExclusiveOwnerThread(null);
flag = true;

}
setState(state);
return flag;
}


Condition newCondition() {
return new ConditionObject();
}
}

@Override
public void lock() {
helper.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
helper.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return helper.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return helper.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
helper.release(1);
}

@Override
public Condition newCondition() {
return helper.newCondition();
}
}

测试类如下,锁正常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class textAQS {
private int value=0;
private AQSDemo01 lock = new AQSDemo01();
public int next() {
lock.lock();

try {
Thread.sleep(300);
return value++;
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException();
} finally {
lock.unlock();
}
}

/*
* 经典的验证 锁的重复问题 ,在单一的线程下, a() 想在 未释放锁 的前提下 调用b(),前提就是可冲入锁
* */
public void a() {
lock.lock();
System.out.println("a");
b();
lock.unlock();
}

public void b() {
lock.lock();
System.out.println("b");
lock.unlock();
}

public static void main(String[] args) {

textAQS m = new textAQS();
//测试可重入
new Thread(new Runnable() {

@Override
public void run() {
m.a();
}
}).start();

System.out.println("主线程=="+Thread.currentThread().getName());

ExecutorService executorService = Executors.newCachedThreadPool();


//从线程池拿出四条线程执行next任务,查看结果是否同步,同步
for (int i=0;i<4;i++){
executorService.execute(new Runnable() {
@Override
public void run() {
while(true)
System.out.println(Thread.currentThread().getName()+" "+m.next());
}
});

}

}

}


AQS的同步组件

1. CountDownLatch

  • 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,
    所以在当前计数到达零之前,await 方法会一直受阻塞。当调用了一定次数的CountDown()是计数器的值为零后,会释放所有等待的线程
    ,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置
  • 它的典型使用场景就是分布计算
  • 打个比方

    课代表等所有同学交完作业再交给老师,
    1: 课代表等待所有的同学(线程)交作业 CountDownLatch cdl = new CountDownLatch(int 学生数)
    2: 单个学生交完作用, cdl.countDown() –> 学生数减一
    3: 主线程: cdl.await() 只要学生数不为零, 就等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class countDownLatch02 {
private static int []nums;

public countDownLatch02(int line){
nums=new int[line];
}

//分隔字符串数组,完成 当前行 ( 一行 ) 相加
public void colculate(String s ,int index,CountDownLatch count){
System.out.println("单行线程开始执行.. "+Thread.currentThread().getName());
String[] s1 = s.split(",");
int total=0;
for (String s2:s1) {
int i = Integer.parseInt(s2);
total+=i;
}
nums[index]=total;

System.out.println(Thread.currentThread().getName() +"线程 计算结果是=="+total);
count.countDown();
}


//分别计算没行的总值
public void sum(){
System.out.println("加总线程开始执行...");
int total =0;
for(int i=0;i<nums.length;i++){
total+=nums[i];
}
System.out.println("执行的结果是=="+total);
}


public static void main(String[] args) throws IOException, InterruptedException {

// 根据行数,
List<String> contents = readFile();
int size= contents.size();
CountDownLatch c = new CountDownLatch(size);

countDownLatch02 latch = new countDownLatch02(size);

System.out.println("zhuxianc");
// //创建出相应数目的线程,
for(int i=0;i<size;i++){
final int j =i;
new Thread(new Runnable() {
@Override
public void run() {
latch.colculate(contents.get(j),j,c);
}
}).start();
}
//在主线程中加总
// System.out.println("当前活跃的实现数"+Thread.activeCount());
/* while((Thread.activeCount())>2){ //自旋,等待其他线程执行完..
System.out.println("当前活跃的实现数"+Thread.activeCount());
}*/
c.await();
latch.sum();
}

/*
* 读取文件,将每一行存放进list数组...
* */
public static List<String> readFile() throws IOException {
List<String> list = new ArrayList<>();
String line=null;
BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\SETextMaven\\textcountDownLatch.txt"));
while ((line = bufferedReader.readLine())!=null){
list.add(line);
}
return list;
}

}

2. Semaphore

  • 常常作用于仅能提供有限访问的资源,比如项目中使用到的数据库的连接数,可能最大只有20,但是外界的并发量却很庞大,所以我们可以使用Semaphore进行控制,当它把并发数控制到1时,和单线程很相似
  • 他可以很容易的控制同一时刻,并发访问某一个资源被的线程数,使用起来也很简单,对需要进行并发控制的代码用 semaphore.acquire()和 semaphore.release(); 包裹起来即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* 字面意思: 信号量 --> Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目
* 作用: 用来控制同时访问某些特定资源的线程数量,协调各个线程合理使用公共资源
* 简介: Semaphore 可以用来维护当前访问自身的线程个数,并提供了同步机制,比如实现一个文件的允许的并发数
* 应用场景:
* 开启 30条线程 把 一万个文件的内容读取到内存
* 使用Semaphore允许10个线程可以并发执行,将内存中的数据写回数据库
*
* 模拟高并发
* */
public class semaphore {
public static void main(String[] args) {
final int tNum =30;
// ExecutorService executorService;
// executorService = new Executors.newFixedThreadPool();
Semaphore semaphore = new Semaphore(5);//允许一次性允许 并发执行的线程数

for (int i=0;i<100;i++){
new Thread(()->{
try {
semaphore.acquire(); //当前线程获取 Semaphore 的 许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"开始任务");

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

semaphore.release(); // 归还许可证

}).start();
}
}
}

3 . CyclicBarrier

  • 和CountDownLanch不同的是,它描述的是所有的线程相互等待的过程
    构造方法,参数为parties - 在启动 barrier 前必须调用 await() 的线程数
  • 注意点,如果 传入的参数为6,而所有线程一共才五条,那么主线程和 子线程,将永远处于等待状态,因为没有第六条线程执行 await方法… 或者, 线程在执行await()之前,出现异常, 屏障永远不会被满足

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
 
/*
* 它允许一组线程相互等待,直到达到某个公共的屏障点,
* 也就是说,所有的线程必须相互等待,--> 直到所有线程都 满足屏障的要求--> 执行后面的任务
*
* 开会:
* await() 屏障 后续的任务
* 公司里的所有人都去开会,--> 先到的人等待迟到的人--> 人到齐了,开会....
* */

/*
* 应用场景,多线程计算数据,最后合并计算结果
* */

/*
* 模拟开会...
* */

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrier01 {

public void meeting(CyclicBarrier cyclicBarrier){
System.out.println(Thread.currentThread().getName()+"到达会议室...");

try {
cyclicBarrier.await(); //此线程等待..
System.out.println(Thread.currentThread().getName()+"准备开会..");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {

CyclicBarrier01 c = new CyclicBarrier01();
// 构造函数1:
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,
// 但它不会在启动 barrier 时执行预定义的操作。
//参数:
//parties - 在启动 barrier 前必须调用 await() 的线程数
//抛出:
//IllegalArgumentException - 如果 parties 小于 1
//注意点:
// 如果 传入的参数为6,而所有线程一共才五条,那么主线程和 子线程,将永远处于等待状态,因为没有第六条线程执行 await方法...
// 或者, 线程在执行await()之前,出现异常, 屏障永远不会被满足
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
for (int i =0;i<5;i++) { //开启五条线程...
new Thread(new Runnable() {
@Override
public void run() {
c.meeting(cyclicBarrier);
}
}).start();
}


// 主线程
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("人都到齐了,开会...");

// cyclicBarrier.reset();
}
}

```

带 Runable, 当所有预期的线程都await后,先执行Runable里面的任务
```java
public class CyclicBarrier02 {

public void meeting(CyclicBarrier cyclicBarrier){
System.out.println(Thread.currentThread().getName()+"到达会议室...");

try {
cyclicBarrier.await(); //此线程等待..
System.out.println(Thread.currentThread().getName()+"听领导讲话...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {

CyclicBarrier02 c = new CyclicBarrier02();
// 构造函数2:
/*
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,
并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

参数:
parties - 在启动 barrier 前必须调用 await() 的线程数
barrierAction - 在启动 barrier 时执行的命令;如果不执行任何操作,则该参数为 null
抛出:
*/

CyclicBarrier cyclicBarrier = new CyclicBarrier(6, new Runnable() {
@Override
public void run() {

System.out.println("开始开会...");
}
});
for (int i =0;i<5;i++) { //开启五条线程...
new Thread(new Runnable() {
@Override
public void run() {
c.meeting(cyclicBarrier);
}
}).start();
}
// 主线程
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("主线程也 await了 ... 人都到齐了,开会...");
cyclicBarrier.reset();

}
}

4. J.U.C同步组件 FutrueTask

在 多线程二 基本技能中有详细的将讲解,使用

5 J.U.C 同步组件Fork/Join框架

  • ForkJoin是java7提供的并行执行任务的框架
  • 它的设计思路是把一个大人物Fork成若干个小任务分布计算,然后Join这些子任务的结果,最终得到这个大任务的结果,使用工作窃取算法也就是某个线程从其他的线程的工作队列(双端队列,来窃取的线程从这个队列的尾部取任务,减少竞争)里面窃取任务执行

局限性:

  • 假如说双端任务队列里面就一个任务,那么肯定就出现竞争,而且还有多开辟线程的开销
  • 只能使用Fork和Join去同步,如果使用了别的同步机制.那么同步线程就不能去窃取执行其他任务
  • 工作队列里面的任务不应该是IO操作
  • 任务不能抛出检查异常,它必须通过必要的代码去处理他们